//! The legacy HTTP Client from 0.14.x
//!
//! This `Client` will eventually be deconstructed into more composable parts.
//! For now, to enable people to use hyper 1.0 quicker, this `Client` exists
//! in much the same way it did in hyper 0.14.

use std::error::Error as StdError;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::task::{self, Poll};
use std::time::Duration;

use futures_util::future::{self, Either, FutureExt, TryFutureExt};
use http::uri::Scheme;
use hyper::body::Body;
use hyper::client::conn::TrySendError as ConnTrySendError;
use hyper::header::{HeaderValue, HOST};
use hyper::rt::Timer;
use hyper::{Method, Request, Response, Uri, Version};
use tracing::{debug, trace, warn};

use super::connect::capture::CaptureConnectionExtension;
use super::connect::{Alpn, Connect, Connected, Connection};
use super::pool::{self, Ver};
use crate::common::future::poll_fn;
use crate::common::{lazy as hyper_lazy, timer, Exec, Lazy, SyncWrapper};

type BoxSendFuture = Pin<Box<dyn Future<Output = ()> + Send>>;

/// A Client to make outgoing HTTP requests.
///
/// `Client` is cheap to clone and cloning is the recommended way to share a `Client`. The
/// underlying connection pool will be reused.
#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
pub struct Client<C, B, PK: pool::Key = DefaultPoolKey> {
	config: Config,
	connector: C,
	exec: Exec,
	#[cfg(feature = "http1")]
	h1_builder: hyper::client::conn::http1::Builder,
	#[cfg(feature = "http2")]
	h2_builder: hyper::client::conn::http2::Builder<Exec>,
	pool: pool::Pool<PoolClient<B>, PK>,
}

#[derive(Clone, Copy, Debug)]
struct Config {
	retry_canceled_requests: bool,
	set_host: bool,
	ver: Ver,
}

/// Client errors
pub struct Error {
	kind: ErrorKind,
	source: Option<Box<dyn StdError + Send + Sync>>,
	#[cfg(any(feature = "http1", feature = "http2"))]
	connect_info: Option<Connected>,
}

#[derive(Debug)]
enum ErrorKind {
	Canceled,
	ChannelClosed,
	Connect,
	SendRequest,
	NoPoolKey,
}

macro_rules! e {
	($kind:ident) => {
		Error {
			kind: ErrorKind::$kind,
			source: None,
			connect_info: None,
		}
	};
	($kind:ident, $src:expr) => {
		Error {
			kind: ErrorKind::$kind,
			source: Some($src.into()),
			connect_info: None,
		}
	};
}

// We might change this... :shrug:
type DefaultPoolKey = (http::uri::Scheme, http::uri::Authority);

#[allow(clippy::large_enum_variant)]
enum TrySendError<B> {
	Retryable {
		error: Error,
		req: Request<B>,
		connection_reused: bool,
	},
	Nope(Error),
}

/// A `Future` that will resolve to an HTTP Response.
///
/// This is returned by `Client::request` (and `Client::get`).
#[must_use = "futures do nothing unless polled"]
pub struct ResponseFuture {
	#[allow(clippy::type_complexity)]
	inner: SyncWrapper<
		Pin<Box<dyn Future<Output = Result<Response<hyper::body::Incoming>, Error>> + Send>>,
	>,
}

// ===== impl Client =====

impl Client<(), ()> {
	/// Create a builder to configure a new `Client`.
	///
	/// # Example
	///
	/// ```
	/// # #[cfg(feature = "tokio")]
	/// # fn run () {
	/// use std::time::Duration;
	/// use hyper_util::client::legacy::Client;
	/// use hyper_util::rt::{TokioExecutor, TokioTimer};
	///
	/// let client = Client::builder(TokioExecutor::new())
	///     .pool_timer(TokioTimer::new())
	///     .pool_idle_timeout(Duration::from_secs(30))
	///     .http2_only(true)
	///     .build_http();
	/// # let infer: Client<_, http_body_util::Full<bytes::Bytes>> = client;
	/// # drop(infer);
	/// # }
	/// # fn main() {}
	/// ```
	pub fn builder<E>(executor: E) -> Builder
	where
		E: hyper::rt::Executor<BoxSendFuture> + Send + Sync + Clone + 'static,
	{
		Builder::new(executor)
	}
}

impl<C, B, PK> Client<C, B, PK>
where
	C: Connect + Clone + Send + Sync + 'static,
	B: Body + Send + 'static + Unpin,
	B::Data: Send,
	B::Error: Into<Box<dyn StdError + Send + Sync>>,
	PK: pool::Key,
{
	/// Send a `GET` request to the supplied `Uri`.
	///
	/// # Note
	///
	/// This requires that the `Body` type have a `Default` implementation.
	/// It *should* return an "empty" version of itself, such that
	/// `Body::is_end_stream` is `true`.
	///
	/// # Example
	///
	/// ```
	/// # #[cfg(feature = "tokio")]
	/// # fn run () {
	/// use hyper::Uri;
	/// use hyper_util::client::legacy::Client;
	/// use hyper_util::rt::TokioExecutor;
	/// use bytes::Bytes;
	/// use http_body_util::Full;
	///
	/// let client: Client<_, Full<Bytes>> = Client::builder(TokioExecutor::new()).build_http();
	///
	/// let future = client.get(Uri::from_static("http://httpbin.org/ip"));
	/// # }
	/// # fn main() {}
	/// ```
	pub fn get(&self, uri: Uri) -> ResponseFuture
	where
		B: Default,
	{
		let body = B::default();
		if !body.is_end_stream() {
			warn!("default Body used for get() does not return true for is_end_stream");
		}

		let mut req = Request::new(body);
		*req.uri_mut() = uri;
		self.request(req)
	}

	/// Send a constructed `Request` using this `Client`.
	///
	/// # Example
	///
	/// ```
	/// # #[cfg(feature = "tokio")]
	/// # fn run () {
	/// use hyper::{Method, Request};
	/// use hyper_util::client::legacy::Client;
	/// use http_body_util::Full;
	/// use hyper_util::rt::TokioExecutor;
	/// use bytes::Bytes;
	///
	/// let client: Client<_, Full<Bytes>> = Client::builder(TokioExecutor::new()).build_http();
	///
	/// let req: Request<Full<Bytes>> = Request::builder()
	///     .method(Method::POST)
	///     .uri("http://httpbin.org/post")
	///     .body(Full::from("Hallo!"))
	///     .expect("request builder");
	///
	/// let future = client.request(req);
	/// # }
	/// # fn main() {}
	/// ```
	pub fn request(&self, req: Request<B>) -> ResponseFuture {
		ResponseFuture::new(self.clone().send_request(req))
	}

	async fn send_request(
		self,
		mut req: Request<B>,
	) -> Result<Response<hyper::body::Incoming>, Error> {
		// We may change URI so clone it to keep the original
		let uri = req.uri().clone();

		loop {
			req = match self.try_send_request(req).await {
				Ok(resp) => return Ok(resp),
				Err(TrySendError::Nope(err)) => return Err(err),
				Err(TrySendError::Retryable {
					mut req,
					error,
					connection_reused,
				}) => {
					if !self.config.retry_canceled_requests || !connection_reused {
						// if client disabled, don't retry
						// a fresh connection means we definitely can't retry
						return Err(error);
					}

					trace!(
						"unstarted request canceled, trying again (reason={:?})",
						error
					);
					*req.uri_mut() = uri.clone();
					req
				},
			}
		}
	}

	async fn try_send_request(
		&self,
		req: Request<B>,
	) -> Result<Response<hyper::body::Incoming>, TrySendError<B>> {
		let (parts, body) = req.into_parts();
		let mut pooled = self
			.connection_for(&parts)
			.await
			// `connection_for` already retries checkout errors, so if
			// it returns an error, there's not much else to retry
			.map_err(TrySendError::Nope)?;
		let mut req = Request::from_parts(parts, body);

		if let Some(conn) = req.extensions_mut().get_mut::<CaptureConnectionExtension>() {
			conn.set(&pooled.conn_info);
		}

		if pooled.is_http1() {
			if req.version() == Version::HTTP_2 {
				// This means we negotiated down in ALPN
				*req.version_mut() = Version::HTTP_11;
				trace!("Connection is HTTP/1, but request was HTTP/2");
			}

			if self.config.set_host {
				let uri = req.uri().clone();
				req.headers_mut().entry(HOST).or_insert_with(|| {
					let hostname = uri.host().expect("authority implies host");
					if let Some(port) = get_non_default_port(&uri) {
						let s = format!("{hostname}:{port}");
						HeaderValue::from_str(&s)
					} else {
						HeaderValue::from_str(hostname)
					}
					.expect("uri host is valid header value")
				});
			}

			// CONNECT always sends authority-form, so check it first...
			if req.method() == Method::CONNECT {
				authority_form(req.uri_mut());
			} else if pooled.conn_info.is_proxied {
				absolute_form(req.uri_mut());
			} else {
				origin_form(req.uri_mut());
			}
		} else if req.method() == Method::CONNECT && !pooled.is_http2() {
			authority_form(req.uri_mut());
		}

		let mut res = match pooled.try_send_request(req).await {
			Ok(res) => res,
			Err(mut err) => {
				return if let Some(req) = err.take_message() {
					Err(TrySendError::Retryable {
						connection_reused: pooled.is_reused(),
						error: e!(Canceled, err.into_error()).with_connect_info(pooled.conn_info.clone()),
						req,
					})
				} else {
					Err(TrySendError::Nope(
						e!(SendRequest, err.into_error()).with_connect_info(pooled.conn_info.clone()),
					))
				}
			},
		};

		// If the Connector included 'extra' info, add to Response...
		if let Some(extra) = &pooled.conn_info.extra {
			extra.set(res.extensions_mut());
		}

		// If pooled is HTTP/2, we can toss this reference immediately.
		//
		// when pooled is dropped, it will try to insert back into the
		// pool. To delay that, spawn a future that completes once the
		// sender is ready again.
		//
		// This *should* only be once the related `Connection` has polled
		// for a new request to start.
		//
		// It won't be ready if there is a body to stream.
		if pooled.is_http2() || !pooled.is_pool_enabled() || pooled.is_ready() {
			drop(pooled);
		} else if !res.body().is_end_stream() {
			// let (delayed_tx, delayed_rx) = oneshot::channel::<()>();
			// res.body_mut().delayed_eof(delayed_rx);
			let on_idle = poll_fn(move |cx| pooled.poll_ready(cx)).map(move |_| {
				// At this point, `pooled` is dropped, and had a chance
				// to insert into the pool (if conn was idle)
				// drop(delayed_tx);
			});

			self.exec.execute(on_idle);
		} else {
			// There's no body to delay, but the connection isn't
			// ready yet. Only re-insert when it's ready
			let on_idle = poll_fn(move |cx| pooled.poll_ready(cx)).map(|_| ());

			self.exec.execute(on_idle);
		}

		Ok(res)
	}

	async fn connection_for(
		&self,
		dst: &http::request::Parts,
	) -> Result<pool::Pooled<PoolClient<B>, PK>, Error> {
		loop {
			match self.one_connection_for(dst).await {
				Ok(pooled) => return Ok(pooled),
				Err(ClientConnectError::Normal(err)) => return Err(err),
				Err(ClientConnectError::CheckoutIsClosed(reason)) => {
					if !self.config.retry_canceled_requests {
						return Err(e!(Connect, reason));
					}

					trace!(
						"unstarted request canceled, trying again (reason={:?})",
						reason,
					);
					continue;
				},
			};
		}
	}

	async fn one_connection_for(
		&self,
		dst: &http::request::Parts,
	) -> Result<pool::Pooled<PoolClient<B>, PK>, ClientConnectError> {
		// Return a single connection if pooling is not enabled

		let Some(pool_key) = dst.extensions.get::<PK>() else {
			return Err(ClientConnectError::Normal(e!(NoPoolKey)));
		};
		if !self.pool.is_enabled() {
			return self
				.connect_to(dst, pool_key)
				.await
				.map_err(ClientConnectError::Normal);
		}

		// This actually races 2 different futures to try to get a ready
		// connection the fastest, and to reduce connection churn.
		//
		// - If the pool has an idle connection waiting, that's used
		//   immediately.
		// - Otherwise, the Connector is asked to start connecting to
		//   the destination Uri.
		// - Meanwhile, the pool Checkout is watching to see if any other
		//   request finishes and tries to insert an idle connection.
		// - If a new connection is started, but the Checkout wins after
		//   (an idle connection became available first), the started
		//   connection future is spawned into the runtime to complete,
		//   and then be inserted into the pool as an idle connection.
		let checkout = self.pool.checkout(pool_key.clone());
		let connect = self.connect_to(dst, pool_key);
		let is_ver_h2 = self.config.ver == Ver::Http2;

		// The order of the `select` is depended on below...

		match future::select(checkout, connect).await {
			// Checkout won, connect future may have been started or not.
			//
			// If it has, let it finish and insert back into the pool,
			// so as to not waste the socket...
			Either::Left((Ok(checked_out), connecting)) => {
				// This depends on the `select` above having the correct
				// order, such that if the checkout future were ready
				// immediately, the connect future will never have been
				// started.
				//
				// If it *wasn't* ready yet, then the connect future will
				// have been started...
				if connecting.started() {
					let bg = connecting
						.map_err(|err| {
							trace!("background connect error: {}", err);
						})
						.map(|_pooled| {
							// dropping here should just place it in
							// the Pool for us...
						});
					// An execute error here isn't important, we're just trying
					// to prevent a waste of a socket...
					self.exec.execute(bg);
				}
				Ok(checked_out)
			},
			// Connect won, checkout can just be dropped.
			Either::Right((Ok(connected), _checkout)) => Ok(connected),
			// Either checkout or connect could get canceled:
			//
			// 1. Connect is canceled if this is HTTP/2 and there is
			//    an outstanding HTTP/2 connecting task.
			// 2. Checkout is canceled if the pool cannot deliver an
			//    idle connection reliably.
			//
			// In both cases, we should just wait for the other future.
			Either::Left((Err(err), connecting)) => {
				if err.is_canceled() {
					connecting.await.map_err(ClientConnectError::Normal)
				} else {
					Err(ClientConnectError::Normal(e!(Connect, err)))
				}
			},
			Either::Right((Err(err), checkout)) => {
				if err.is_canceled() {
					checkout.await.map_err(move |err| {
						if is_ver_h2 && err.is_canceled() {
							ClientConnectError::CheckoutIsClosed(err)
						} else {
							ClientConnectError::Normal(e!(Connect, err))
						}
					})
				} else {
					Err(ClientConnectError::Normal(err))
				}
			},
		}
	}

	#[cfg(any(feature = "http1", feature = "http2"))]
	fn connect_to(
		&self,
		dst: &http::request::Parts,
		pk: &PK,
	) -> impl Lazy<Output = Result<pool::Pooled<PoolClient<B>, PK>, Error>> + Send + Unpin + use<C, B, PK>
	{
		let executor = self.exec.clone();
		let pool = self.pool.clone();
		#[cfg(feature = "http1")]
		let h1_builder = self.h1_builder.clone();
		#[cfg(feature = "http2")]
		let h2_builder = self.h2_builder.clone();
		// let ver = self.config.ver;
		let ver = if dst.version == Version::HTTP_2 {
			Ver::Http2
		} else {
			Ver::Auto
		};
		let is_ver_h2 = ver == Ver::Http2;
		let connector = self.connector.clone();
		// TODO: it would be nice to just pass pk directly, but the tower::Service makes this tricky to make
		// it generic.
		let mut ext = http::Extensions::new();
		let pkc = pk.clone();
		ext.insert(pkc);
		let pkc = pk.clone();
		hyper_lazy(move || {
			// Try to take a "connecting lock".
			//
			// If the pool_key is for HTTP/2, and there is already a
			// connection being established, then this can't take a
			// second lock. The "connect_to" future is Canceled.
			let connecting = match pool.connecting(pkc, ver) {
				Some(lock) => lock,
				None => {
					let canceled = e!(Canceled);
					// TODO
					// crate::Error::new_canceled().with("HTTP/2 connection in progress");
					return Either::Right(future::err(canceled));
				},
			};
			Either::Left(
				connector
					.connect(super::connect::sealed::Internal, ext)
					.map_err(|src| e!(Connect, src))
					.and_then(move |io| {
						let connected = io.connected();
						// If ALPN is h2 and we aren't http2_only already,
						// then we need to convert our pool checkout into
						// a single HTTP2 one.
						let connecting = if connected.alpn == Alpn::H2 && !is_ver_h2 {
							match connecting.alpn_h2(&pool) {
								Some(lock) => {
									trace!("ALPN negotiated h2, updating pool");
									lock
								},
								None => {
									// Another connection has already upgraded,
									// the pool checkout should finish up for us.
									let canceled = e!(Canceled, "ALPN upgraded to HTTP/2");
									return Either::Right(future::err(canceled));
								},
							}
						} else {
							connecting
						};

						#[cfg_attr(not(feature = "http2"), allow(unused))]
						let is_h2 = (is_ver_h2 && connected.alpn == Alpn::None) || connected.alpn == Alpn::H2;

						Either::Left(Box::pin(async move {
							let tx = if is_h2 {
								#[cfg(feature = "http2")]
								{
									let (mut tx, conn) = h2_builder.handshake(io).await.map_err(Error::tx)?;

									trace!("http2 handshake complete, spawning background dispatcher task");
									executor.execute(
										conn
											.map_err(|e| debug!("client connection error: {}", e))
											.map(|_| ()),
									);

									// Wait for 'conn' to ready up before we
									// declare this tx as usable
									tx.ready().await.map_err(Error::tx)?;
									PoolTx::Http2(tx)
								}
								#[cfg(not(feature = "http2"))]
								panic!("http2 feature is not enabled");
							} else {
								#[cfg(feature = "http1")]
								{
									// Perform the HTTP/1.1 handshake on the provided I/O stream.
									// Uses the h1_builder to establish a connection, returning a sender (tx) for requests
									// and a connection task (conn) that manages the connection lifecycle.
									let (mut tx, conn) = h1_builder
										.handshake(io)
										.await
										.map_err(crate::client::legacy::client::Error::tx)?;

									// Log that the HTTP/1.1 handshake has completed successfully.
									// This indicates the connection is established and ready for request processing.
									trace!("http1 handshake complete, spawning background dispatcher task");
									// Create a oneshot channel to communicate errors from the connection task.
									// err_tx sends errors from the connection task, and err_rx receives them
									// to correlate connection failures with request readiness errors.
									let (err_tx, err_rx) = tokio::sync::oneshot::channel();
									// Spawn the connection task in the background using the executor.
									// The task manages the HTTP/1.1 connection, including upgrades (e.g., WebSocket).
									// Errors are sent via err_tx to ensure they can be checked if the sender (tx) fails.
									executor.execute(
										conn
											.with_upgrades()
											.map_err(|e| {
												// Log the connection error at debug level for diagnostic purposes.
												debug!("client connection error: {:?}", e);
												// Log that the error is being sent to the error channel.
												trace!("sending connection error to error channel");
												// Send the error via the oneshot channel, ignoring send failures
												// (e.g., if the receiver is dropped, which is handled later).
												let _ = err_tx.send(e);
											})
											.map(|_| ()),
									);
									// Log that the client is waiting for the connection to be ready.
									// Readiness indicates the sender (tx) can accept a request without blocking.
									trace!("waiting for connection to be ready");
									// Check if the sender is ready to accept a request.
									// This ensures the connection is fully established before proceeding.
									// aka:
									// Wait for 'conn' to ready up before we
									// declare this tx as usable
									match tx.ready().await {
										// If ready, the connection is usable for sending requests.
										Ok(_) => {
											// Log that the connection is ready for use.
											trace!("connection is ready");
											// Drop the error receiver, as it’s no longer needed since the sender is ready.
											// This prevents waiting for errors that won’t occur in a successful case.
											drop(err_rx);
											// Wrap the sender in PoolTx::Http1 for use in the connection pool.
											PoolTx::Http1(tx)
										},
										// If the sender fails with a closed channel error, check for a specific connection error.
										// This distinguishes between a vague ChannelClosed error and an actual connection failure.
										Err(e) if e.is_closed() => {
											// Log that the channel is closed, indicating a potential connection issue.
											trace!("connection channel closed, checking for connection error");
											// Check the oneshot channel for a specific error from the connection task.
											match err_rx.await {
												// If an error was received, it’s a specific connection failure.
												Ok(err) => {
													// Log the specific connection error for diagnostics.
													trace!("received connection error: {:?}", err);
													// Return the error wrapped in Error::tx to propagate it.
													return Err(crate::client::legacy::client::Error::tx(err));
												},
												// If the error channel is closed, no specific error was sent.
												// Fall back to the vague ChannelClosed error.
												Err(_) => {
													// Log that the error channel is closed, indicating no specific error.
													trace!("error channel closed, returning the vague ChannelClosed error");
													// Return the original error wrapped in Error::tx.
													return Err(crate::client::legacy::client::Error::tx(e));
												},
											}
										},
										// For other errors (e.g., timeout, I/O issues), propagate them directly.
										// These are not ChannelClosed errors and don’t require error channel checks.
										Err(e) => {
											// Log the specific readiness failure for diagnostics.
											trace!("connection readiness failed: {:?}", e);
											// Return the error wrapped in Error::tx to propagate it.
											return Err(crate::client::legacy::client::Error::tx(e));
										},
									}
								}
								#[cfg(not(feature = "http1"))]
								{
									panic!("http1 feature is not enabled");
								}
							};

							Ok(pool.pooled(
								connecting,
								PoolClient {
									conn_info: connected,
									tx,
								},
							))
						}))
					}),
			)
		})
	}
}

impl<C, B> tower_service::Service<Request<B>> for Client<C, B>
where
	C: Connect + Clone + Send + Sync + 'static,
	B: Body + Send + 'static + Unpin,
	B::Data: Send,
	B::Error: Into<Box<dyn StdError + Send + Sync>>,
{
	type Response = Response<hyper::body::Incoming>;
	type Error = Error;
	type Future = ResponseFuture;

	fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
		Poll::Ready(Ok(()))
	}

	fn call(&mut self, req: Request<B>) -> Self::Future {
		self.request(req)
	}
}

impl<C, B> tower_service::Service<Request<B>> for &'_ Client<C, B>
where
	C: Connect + Clone + Send + Sync + 'static,
	B: Body + Send + 'static + Unpin,
	B::Data: Send,
	B::Error: Into<Box<dyn StdError + Send + Sync>>,
{
	type Response = Response<hyper::body::Incoming>;
	type Error = Error;
	type Future = ResponseFuture;

	fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
		Poll::Ready(Ok(()))
	}

	fn call(&mut self, req: Request<B>) -> Self::Future {
		self.request(req)
	}
}

impl<C: Clone, B, PK: pool::Key> Clone for Client<C, B, PK> {
	fn clone(&self) -> Client<C, B, PK> {
		Client {
			config: self.config,
			exec: self.exec.clone(),
			#[cfg(feature = "http1")]
			h1_builder: self.h1_builder.clone(),
			#[cfg(feature = "http2")]
			h2_builder: self.h2_builder.clone(),
			connector: self.connector.clone(),
			pool: self.pool.clone(),
		}
	}
}

impl<C, B> fmt::Debug for Client<C, B> {
	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
		f.debug_struct("Client").finish()
	}
}

// ===== impl ResponseFuture =====

impl ResponseFuture {
	fn new<F>(value: F) -> Self
	where
		F: Future<Output = Result<Response<hyper::body::Incoming>, Error>> + Send + 'static,
	{
		Self {
			inner: SyncWrapper::new(Box::pin(value)),
		}
	}
}

impl fmt::Debug for ResponseFuture {
	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
		f.pad("Future<Response>")
	}
}

impl Future for ResponseFuture {
	type Output = Result<Response<hyper::body::Incoming>, Error>;

	fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
		self.inner.get_mut().as_mut().poll(cx)
	}
}

// ===== impl PoolClient =====

// FIXME: allow() required due to `impl Trait` leaking types to this lint
#[allow(missing_debug_implementations)]
struct PoolClient<B> {
	conn_info: Connected,
	tx: PoolTx<B>,
}

enum PoolTx<B> {
	#[cfg(feature = "http1")]
	Http1(hyper::client::conn::http1::SendRequest<B>),
	#[cfg(feature = "http2")]
	Http2(hyper::client::conn::http2::SendRequest<B>),
}

impl<B> PoolClient<B> {
	fn poll_ready(
		&mut self,
		#[allow(unused_variables)] cx: &mut task::Context<'_>,
	) -> Poll<Result<(), Error>> {
		match self.tx {
			#[cfg(feature = "http1")]
			PoolTx::Http1(ref mut tx) => tx.poll_ready(cx).map_err(Error::closed),
			#[cfg(feature = "http2")]
			PoolTx::Http2(_) => Poll::Ready(Ok(())),
		}
	}

	fn is_http1(&self) -> bool {
		!self.is_http2()
	}

	fn is_http2(&self) -> bool {
		match self.tx {
			#[cfg(feature = "http1")]
			PoolTx::Http1(_) => false,
			#[cfg(feature = "http2")]
			PoolTx::Http2(_) => true,
		}
	}

	fn is_poisoned(&self) -> bool {
		self.conn_info.poisoned.poisoned()
	}

	fn is_ready(&self) -> bool {
		match self.tx {
			#[cfg(feature = "http1")]
			PoolTx::Http1(ref tx) => tx.is_ready(),
			#[cfg(feature = "http2")]
			PoolTx::Http2(ref tx) => tx.is_ready(),
		}
	}
}

impl<B: Body + 'static> PoolClient<B> {
	fn try_send_request(
		&mut self,
		req: Request<B>,
	) -> impl Future<Output = Result<Response<hyper::body::Incoming>, ConnTrySendError<Request<B>>>> + use<B>
	where
		B: Send,
	{
		#[cfg(all(feature = "http1", feature = "http2"))]
		return match self.tx {
			#[cfg(feature = "http1")]
			PoolTx::Http1(ref mut tx) => Either::Left(tx.try_send_request(req)),
			#[cfg(feature = "http2")]
			PoolTx::Http2(ref mut tx) => Either::Right(tx.try_send_request(req)),
		};

		#[cfg(feature = "http1")]
		#[cfg(not(feature = "http2"))]
		return match self.tx {
			#[cfg(feature = "http1")]
			PoolTx::Http1(ref mut tx) => tx.try_send_request(req),
		};

		#[cfg(not(feature = "http1"))]
		#[cfg(feature = "http2")]
		return match self.tx {
			#[cfg(feature = "http2")]
			PoolTx::Http2(ref mut tx) => tx.try_send_request(req),
		};
	}
}

impl<B> pool::Poolable for PoolClient<B>
where
	B: Send + 'static,
{
	fn is_open(&self) -> bool {
		!self.is_poisoned() && self.is_ready()
	}

	fn reserve(self) -> pool::Reservation<Self> {
		match self.tx {
			#[cfg(feature = "http1")]
			PoolTx::Http1(tx) => pool::Reservation::Unique(PoolClient {
				conn_info: self.conn_info,
				tx: PoolTx::Http1(tx),
			}),
			#[cfg(feature = "http2")]
			PoolTx::Http2(tx) => {
				let b = PoolClient {
					conn_info: self.conn_info.clone(),
					tx: PoolTx::Http2(tx.clone()),
				};
				let a = PoolClient {
					conn_info: self.conn_info,
					tx: PoolTx::Http2(tx),
				};
				pool::Reservation::Shared(a, b)
			},
		}
	}

	fn can_share(&self) -> bool {
		self.is_http2()
	}
}

enum ClientConnectError {
	Normal(Error),
	CheckoutIsClosed(pool::Error),
}

fn origin_form(uri: &mut Uri) {
	let path = match uri.path_and_query() {
		Some(path) if path.as_str() != "/" => {
			let mut parts = ::http::uri::Parts::default();
			parts.path_and_query = Some(path.clone());
			Uri::from_parts(parts).expect("path is valid uri")
		},
		_none_or_just_slash => {
			debug_assert!(Uri::default() == "/");
			Uri::default()
		},
	};
	*uri = path
}

fn absolute_form(uri: &mut Uri) {
	debug_assert!(uri.scheme().is_some(), "absolute_form needs a scheme");
	debug_assert!(
		uri.authority().is_some(),
		"absolute_form needs an authority"
	);
	// If the URI is to HTTPS, and the connector claimed to be a agentgateway,
	// then it *should* have tunneled, and so we don't want to send
	// absolute-form in that case.
	if uri.scheme() == Some(&Scheme::HTTPS) {
		origin_form(uri);
	}
}

fn authority_form(uri: &mut Uri) {
	if let Some(path) = uri.path_and_query() {
		// `https://hyper.rs` would parse with `/` path, don't
		// annoy people about that...
		if path != "/" {
			warn!("HTTP/1.1 CONNECT request stripping path: {:?}", path);
		}
	}
	*uri = match uri.authority() {
		Some(auth) => {
			let mut parts = ::http::uri::Parts::default();
			parts.authority = Some(auth.clone());
			Uri::from_parts(parts).expect("authority is valid")
		},
		None => {
			unreachable!("authority_form with relative uri");
		},
	};
}

fn get_non_default_port(uri: &Uri) -> Option<http::uri::Port<&str>> {
	match (uri.port().map(|p| p.as_u16()), is_schema_secure(uri)) {
		(Some(443), true) => None,
		(Some(80), false) => None,
		_ => uri.port(),
	}
}

fn is_schema_secure(uri: &Uri) -> bool {
	uri
		.scheme_str()
		.map(|scheme_str| matches!(scheme_str, "wss" | "https"))
		.unwrap_or_default()
}

/// A builder to configure a new [`Client`](Client).
///
/// # Example
///
/// ```
/// # #[cfg(feature = "tokio")]
/// # fn run () {
/// use std::time::Duration;
/// use hyper_util::client::legacy::Client;
/// use hyper_util::rt::TokioExecutor;
///
/// let client = Client::builder(TokioExecutor::new())
///     .pool_idle_timeout(Duration::from_secs(30))
///     .http2_only(true)
///     .build_http();
/// # let infer: Client<_, http_body_util::Full<bytes::Bytes>> = client;
/// # drop(infer);
/// # }
/// # fn main() {}
/// ```
#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
#[derive(Clone)]
pub struct Builder {
	client_config: Config,
	exec: Exec,
	#[cfg(feature = "http1")]
	h1_builder: hyper::client::conn::http1::Builder,
	#[cfg(feature = "http2")]
	h2_builder: hyper::client::conn::http2::Builder<Exec>,
	pool_config: pool::Config,
	pool_timer: Option<timer::Timer>,
}

impl Builder {
	/// Construct a new Builder.
	pub fn new<E>(executor: E) -> Self
	where
		E: hyper::rt::Executor<BoxSendFuture> + Send + Sync + Clone + 'static,
	{
		let exec = Exec::new(executor);
		Self {
			client_config: Config {
				retry_canceled_requests: true,
				set_host: true,
				ver: Ver::Auto,
			},
			exec: exec.clone(),
			#[cfg(feature = "http1")]
			h1_builder: hyper::client::conn::http1::Builder::new(),
			#[cfg(feature = "http2")]
			h2_builder: hyper::client::conn::http2::Builder::new(exec),
			pool_config: pool::Config {
				idle_timeout: Some(Duration::from_secs(90)),
				max_idle_per_host: usize::MAX,
			},
			pool_timer: None,
		}
	}
	/// Set an optional timeout for idle sockets being kept-alive.
	/// A `Timer` is required for this to take effect. See `Builder::pool_timer`
	///
	/// Pass `None` to disable timeout.
	///
	/// Default is 90 seconds.
	///
	/// # Example
	///
	/// ```
	/// # #[cfg(feature = "tokio")]
	/// # fn run () {
	/// use std::time::Duration;
	/// use hyper_util::client::legacy::Client;
	/// use hyper_util::rt::{TokioExecutor, TokioTimer};
	///
	/// let client = Client::builder(TokioExecutor::new())
	///     .pool_idle_timeout(Duration::from_secs(30))
	///     .pool_timer(TokioTimer::new())
	///     .build_http();
	///
	/// # let infer: Client<_, http_body_util::Full<bytes::Bytes>> = client;
	/// # }
	/// # fn main() {}
	/// ```
	pub fn pool_idle_timeout<D>(&mut self, val: D) -> &mut Self
	where
		D: Into<Option<Duration>>,
	{
		self.pool_config.idle_timeout = val.into();
		self
	}

	#[doc(hidden)]
	#[deprecated(note = "renamed to `pool_max_idle_per_host`")]
	pub fn max_idle_per_host(&mut self, max_idle: usize) -> &mut Self {
		self.pool_config.max_idle_per_host = max_idle;
		self
	}

	/// Sets the maximum idle connection per host allowed in the pool.
	///
	/// Default is `usize::MAX` (no limit).
	pub fn pool_max_idle_per_host(&mut self, max_idle: usize) -> &mut Self {
		self.pool_config.max_idle_per_host = max_idle;
		self
	}

	// HTTP/1 options

	/// Sets the exact size of the read buffer to *always* use.
	///
	/// Note that setting this option unsets the `http1_max_buf_size` option.
	///
	/// Default is an adaptive read buffer.
	#[cfg(feature = "http1")]
	#[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
	pub fn http1_read_buf_exact_size(&mut self, sz: usize) -> &mut Self {
		self.h1_builder.read_buf_exact_size(Some(sz));
		self
	}

	/// Set the maximum buffer size for the connection.
	///
	/// Default is ~400kb.
	///
	/// Note that setting this option unsets the `http1_read_exact_buf_size` option.
	///
	/// # Panics
	///
	/// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
	#[cfg(feature = "http1")]
	#[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
	pub fn http1_max_buf_size(&mut self, max: usize) -> &mut Self {
		self.h1_builder.max_buf_size(max);
		self
	}

	/// Set whether HTTP/1 connections will accept spaces between header names
	/// and the colon that follow them in responses.
	///
	/// Newline codepoints (`\r` and `\n`) will be transformed to spaces when
	/// parsing.
	///
	/// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
	/// to say about it:
	///
	/// > No whitespace is allowed between the header field-name and colon. In
	/// > the past, differences in the handling of such whitespace have led to
	/// > security vulnerabilities in request routing and response handling. A
	/// > server MUST reject any received request message that contains
	/// > whitespace between a header field-name and colon with a response code
	/// > of 400 (Bad Request). A agentgateway MUST remove any such whitespace from a
	/// > response message before forwarding the message downstream.
	///
	/// Note that this setting does not affect HTTP/2.
	///
	/// Default is false.
	///
	/// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
	#[cfg(feature = "http1")]
	#[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
	pub fn http1_allow_spaces_after_header_name_in_responses(&mut self, val: bool) -> &mut Self {
		self
			.h1_builder
			.allow_spaces_after_header_name_in_responses(val);
		self
	}

	/// Set whether HTTP/1 connections will accept obsolete line folding for
	/// header values.
	///
	/// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
	/// to say about it:
	///
	/// > A server that receives an obs-fold in a request message that is not
	/// > within a message/http container MUST either reject the message by
	/// > sending a 400 (Bad Request), preferably with a representation
	/// > explaining that obsolete line folding is unacceptable, or replace
	/// > each received obs-fold with one or more SP octets prior to
	/// > interpreting the field value or forwarding the message downstream.
	///
	/// > A agentgateway or gateway that receives an obs-fold in a response message
	/// > that is not within a message/http container MUST either discard the
	/// > message and replace it with a 502 (Bad Gateway) response, preferably
	/// > with a representation explaining that unacceptable line folding was
	/// > received, or replace each received obs-fold with one or more SP
	/// > octets prior to interpreting the field value or forwarding the
	/// > message downstream.
	///
	/// > A user agent that receives an obs-fold in a response message that is
	/// > not within a message/http container MUST replace each received
	/// > obs-fold with one or more SP octets prior to interpreting the field
	/// > value.
	///
	/// Note that this setting does not affect HTTP/2.
	///
	/// Default is false.
	///
	/// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
	#[cfg(feature = "http1")]
	#[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
	pub fn http1_allow_obsolete_multiline_headers_in_responses(&mut self, val: bool) -> &mut Self {
		self
			.h1_builder
			.allow_obsolete_multiline_headers_in_responses(val);
		self
	}

	/// Sets whether invalid header lines should be silently ignored in HTTP/1 responses.
	///
	/// This mimics the behaviour of major browsers. You probably don't want this.
	/// You should only want this if you are implementing a agentgateway whose main
	/// purpose is to sit in front of browsers whose users access arbitrary content
	/// which may be malformed, and they expect everything that works without
	/// the agentgateway to keep working with the agentgateway.
	///
	/// This option will prevent Hyper's client from returning an error encountered
	/// when parsing a header, except if the error was caused by the character NUL
	/// (ASCII code 0), as Chrome specifically always reject those.
	///
	/// The ignorable errors are:
	/// * empty header names;
	/// * characters that are not allowed in header names, except for `\0` and `\r`;
	/// * when `allow_spaces_after_header_name_in_responses` is not enabled,
	///   spaces and tabs between the header name and the colon;
	/// * missing colon between header name and colon;
	/// * characters that are not allowed in header values except for `\0` and `\r`.
	///
	/// If an ignorable error is encountered, the parser tries to find the next
	/// line in the input to resume parsing the rest of the headers. An error
	/// will be emitted nonetheless if it finds `\0` or a lone `\r` while
	/// looking for the next line.
	#[cfg(feature = "http1")]
	#[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
	pub fn http1_ignore_invalid_headers_in_responses(&mut self, val: bool) -> &mut Builder {
		self.h1_builder.ignore_invalid_headers_in_responses(val);
		self
	}

	/// Set whether HTTP/1 connections should try to use vectored writes,
	/// or always flatten into a single buffer.
	///
	/// Note that setting this to false may mean more copies of body data,
	/// but may also improve performance when an IO transport doesn't
	/// support vectored writes well, such as most TLS implementations.
	///
	/// Setting this to true will force hyper to use queued strategy
	/// which may eliminate unnecessary cloning on some TLS backends
	///
	/// Default is `auto`. In this mode hyper will try to guess which
	/// mode to use
	#[cfg(feature = "http1")]
	#[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
	pub fn http1_writev(&mut self, enabled: bool) -> &mut Builder {
		self.h1_builder.writev(enabled);
		self
	}

	/// Set whether HTTP/1 connections will write header names as title case at
	/// the socket level.
	///
	/// Note that this setting does not affect HTTP/2.
	///
	/// Default is false.
	#[cfg(feature = "http1")]
	#[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
	pub fn http1_title_case_headers(&mut self, val: bool) -> &mut Self {
		self.h1_builder.title_case_headers(val);
		self
	}

	/// Set whether to support preserving original header cases.
	///
	/// Currently, this will record the original cases received, and store them
	/// in a private extension on the `Response`. It will also look for and use
	/// such an extension in any provided `Request`.
	///
	/// Since the relevant extension is still private, there is no way to
	/// interact with the original cases. The only effect this can have now is
	/// to forward the cases in a agentgateway-like fashion.
	///
	/// Note that this setting does not affect HTTP/2.
	///
	/// Default is false.
	#[cfg(feature = "http1")]
	#[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
	pub fn http1_preserve_header_case(&mut self, val: bool) -> &mut Self {
		self.h1_builder.preserve_header_case(val);
		self
	}

	/// Set the maximum number of headers.
	///
	/// When a response is received, the parser will reserve a buffer to store headers for optimal
	/// performance.
	///
	/// If client receives more headers than the buffer size, the error "message header too large"
	/// is returned.
	///
	/// The headers is allocated on the stack by default, which has higher performance. After
	/// setting this value, headers will be allocated in heap memory, that is, heap memory
	/// allocation will occur for each response, and there will be a performance drop of about 5%.
	///
	/// Note that this setting does not affect HTTP/2.
	///
	/// Default is 100.
	#[cfg(feature = "http1")]
	#[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
	pub fn http1_max_headers(&mut self, val: usize) -> &mut Self {
		self.h1_builder.max_headers(val);
		self
	}

	/// Set whether HTTP/0.9 responses should be tolerated.
	///
	/// Default is false.
	#[cfg(feature = "http1")]
	#[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
	pub fn http09_responses(&mut self, val: bool) -> &mut Self {
		self.h1_builder.http09_responses(val);
		self
	}

	/// Set whether the connection **must** use HTTP/2.
	///
	/// The destination must either allow HTTP2 Prior Knowledge, or the
	/// `Connect` should be configured to do use ALPN to upgrade to `h2`
	/// as part of the connection process. This will not make the `Client`
	/// utilize ALPN by itself.
	///
	/// Note that setting this to true prevents HTTP/1 from being allowed.
	///
	/// Default is false.
	#[cfg(feature = "http2")]
	#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
	pub fn http2_only(&mut self, val: bool) -> &mut Self {
		self.client_config.ver = if val { Ver::Http2 } else { Ver::Auto };
		self
	}

	/// Configures the maximum number of pending reset streams allowed before a GOAWAY will be sent.
	///
	/// This will default to the default value set by the [`h2` crate](https://crates.io/crates/h2).
	/// As of v0.4.0, it is 20.
	///
	/// See <https://github.com/hyperium/hyper/issues/2877> for more information.
	#[cfg(feature = "http2")]
	#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
	pub fn http2_max_pending_accept_reset_streams(
		&mut self,
		max: impl Into<Option<usize>>,
	) -> &mut Self {
		self.h2_builder.max_pending_accept_reset_streams(max.into());
		self
	}

	/// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
	/// stream-level flow control.
	///
	/// Passing `None` will do nothing.
	///
	/// If not set, hyper will use a default.
	///
	/// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
	#[cfg(feature = "http2")]
	#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
	pub fn http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
		self.h2_builder.initial_stream_window_size(sz.into());
		self
	}

	/// Sets the max connection-level flow control for HTTP2
	///
	/// Passing `None` will do nothing.
	///
	/// If not set, hyper will use a default.
	#[cfg(feature = "http2")]
	#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
	pub fn http2_initial_connection_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
		self.h2_builder.initial_connection_window_size(sz.into());
		self
	}

	/// Sets the initial maximum of locally initiated (send) streams.
	///
	/// This value will be overwritten by the value included in the initial
	/// SETTINGS frame received from the peer as part of a [connection preface].
	///
	/// Passing `None` will do nothing.
	///
	/// If not set, hyper will use a default.
	///
	/// [connection preface]: https://httpwg.org/specs/rfc9113.html#preface
	#[cfg(feature = "http2")]
	#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
	pub fn http2_initial_max_send_streams(&mut self, initial: impl Into<Option<usize>>) -> &mut Self {
		self.h2_builder.initial_max_send_streams(initial);
		self
	}

	/// Sets whether to use an adaptive flow control.
	///
	/// Enabling this will override the limits set in
	/// `http2_initial_stream_window_size` and
	/// `http2_initial_connection_window_size`.
	#[cfg(feature = "http2")]
	#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
	pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self {
		self.h2_builder.adaptive_window(enabled);
		self
	}

	/// Sets the maximum frame size to use for HTTP2.
	///
	/// Passing `None` will do nothing.
	///
	/// If not set, hyper will use a default.
	#[cfg(feature = "http2")]
	#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
	pub fn http2_max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
		self.h2_builder.max_frame_size(sz);
		self
	}

	/// Sets the max size of received header frames for HTTP2.
	///
	/// Default is currently 16KB, but can change.
	#[cfg(feature = "http2")]
	#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
	pub fn http2_max_header_list_size(&mut self, max: u32) -> &mut Self {
		self.h2_builder.max_header_list_size(max);
		self
	}

	/// Sets an interval for HTTP2 Ping frames should be sent to keep a
	/// connection alive.
	///
	/// Pass `None` to disable HTTP2 keep-alive.
	///
	/// Default is currently disabled.
	///
	/// # Cargo Feature
	///
	/// Requires the `tokio` cargo feature to be enabled.
	#[cfg(feature = "tokio")]
	#[cfg(feature = "http2")]
	#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
	pub fn http2_keep_alive_interval(&mut self, interval: impl Into<Option<Duration>>) -> &mut Self {
		self.h2_builder.keep_alive_interval(interval);
		self
	}

	/// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
	///
	/// If the ping is not acknowledged within the timeout, the connection will
	/// be closed. Does nothing if `http2_keep_alive_interval` is disabled.
	///
	/// Default is 20 seconds.
	///
	/// # Cargo Feature
	///
	/// Requires the `tokio` cargo feature to be enabled.
	#[cfg(feature = "tokio")]
	#[cfg(feature = "http2")]
	#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
	pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
		self.h2_builder.keep_alive_timeout(timeout);
		self
	}

	/// Sets whether HTTP2 keep-alive should apply while the connection is idle.
	///
	/// If disabled, keep-alive pings are only sent while there are open
	/// request/responses streams. If enabled, pings are also sent when no
	/// streams are active. Does nothing if `http2_keep_alive_interval` is
	/// disabled.
	///
	/// Default is `false`.
	///
	/// # Cargo Feature
	///
	/// Requires the `tokio` cargo feature to be enabled.
	#[cfg(feature = "tokio")]
	#[cfg(feature = "http2")]
	#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
	pub fn http2_keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self {
		self.h2_builder.keep_alive_while_idle(enabled);
		self
	}

	/// Sets the maximum number of HTTP2 concurrent locally reset streams.
	///
	/// See the documentation of [`h2::client::Builder::max_concurrent_reset_streams`] for more
	/// details.
	///
	/// The default value is determined by the `h2` crate.
	///
	/// [`h2::client::Builder::max_concurrent_reset_streams`]: https://docs.rs/h2/client/struct.Builder.html#method.max_concurrent_reset_streams
	#[cfg(feature = "http2")]
	#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
	pub fn http2_max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
		self.h2_builder.max_concurrent_reset_streams(max);
		self
	}

	/// Provide a timer to be used for h2
	///
	/// See the documentation of [`h2::client::Builder::timer`] for more
	/// details.
	///
	/// [`h2::client::Builder::timer`]: https://docs.rs/h2/client/struct.Builder.html#method.timer
	pub fn timer<M>(&mut self, timer: M) -> &mut Self
	where
		M: Timer + Send + Sync + 'static,
	{
		#[cfg(feature = "http2")]
		self.h2_builder.timer(timer);
		self
	}

	/// Provide a timer to be used for timeouts and intervals in connection pools.
	pub fn pool_timer<M>(&mut self, timer: M) -> &mut Self
	where
		M: Timer + Clone + Send + Sync + 'static,
	{
		self.pool_timer = Some(timer::Timer::new(timer.clone()));
		self
	}

	/// Set the maximum write buffer size for each HTTP/2 stream.
	///
	/// Default is currently 1MB, but may change.
	///
	/// # Panics
	///
	/// The value must be no larger than `u32::MAX`.
	#[cfg(feature = "http2")]
	#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
	pub fn http2_max_send_buf_size(&mut self, max: usize) -> &mut Self {
		self.h2_builder.max_send_buf_size(max);
		self
	}

	/// Set whether to retry requests that get disrupted before ever starting
	/// to write.
	///
	/// This means a request that is queued, and gets given an idle, reused
	/// connection, and then encounters an error immediately as the idle
	/// connection was found to be unusable.
	///
	/// When this is set to `false`, the related `ResponseFuture` would instead
	/// resolve to an `Error::Cancel`.
	///
	/// Default is `true`.
	#[inline]
	pub fn retry_canceled_requests(&mut self, val: bool) -> &mut Self {
		self.client_config.retry_canceled_requests = val;
		self
	}

	/// Set whether to automatically add the `Host` header to requests.
	///
	/// If true, and a request does not include a `Host` header, one will be
	/// added automatically, derived from the authority of the `Uri`.
	///
	/// Default is `true`.
	#[inline]
	pub fn set_host(&mut self, val: bool) -> &mut Self {
		self.client_config.set_host = val;
		self
	}

	/// Combine the configuration of this builder with a connector to create a `Client`.
	pub fn build<C, B>(&self, connector: C) -> Client<C, B, DefaultPoolKey>
	where
		C: Connect + Clone,
		B: Body + Send,
		B::Data: Send,
	{
		self.build_with_pool_key::<C, B, DefaultPoolKey>(connector)
	}

	/// Combine the configuration of this builder with a connector to create a `Client`, with a custom pooling key.
	/// A function to extract the pool key from the request is required.
	pub fn build_with_pool_key<C, B, PK>(&self, connector: C) -> Client<C, B, PK>
	where
		C: Connect + Clone,
		B: Body + Send,
		B::Data: Send,
		PK: pool::Key,
	{
		let exec = self.exec.clone();
		let timer = self.pool_timer.clone();
		Client {
			config: self.client_config,
			exec: exec.clone(),
			#[cfg(feature = "http1")]
			h1_builder: self.h1_builder.clone(),
			#[cfg(feature = "http2")]
			h2_builder: self.h2_builder.clone(),
			connector,
			pool: pool::Pool::<_, PK>::new(self.pool_config, exec, timer),
		}
	}
}

impl fmt::Debug for Builder {
	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
		f.debug_struct("Builder")
			.field("client_config", &self.client_config)
			.field("pool_config", &self.pool_config)
			.finish()
	}
}

// ==== impl Error ====

impl fmt::Debug for Error {
	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
		let mut f = f.debug_tuple("hyper_util::client::legacy::Error");
		f.field(&self.kind);
		if let Some(ref cause) = self.source {
			f.field(cause);
		}
		f.finish()
	}
}

impl fmt::Display for Error {
	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
		if let Some(ref cause) = self.source {
			write!(f, "{:?}: {}", self.kind, cause)
		} else {
			write!(f, "{:?}", self.kind)
		}
	}
}

impl StdError for Error {
	fn source(&self) -> Option<&(dyn StdError + 'static)> {
		self.source.as_ref().map(|e| &**e as _)
	}
}

impl Error {
	/// Returns true if this was an error from `Connect`.
	pub fn is_connect(&self) -> bool {
		matches!(self.kind, ErrorKind::Connect)
	}

	/// Returns the info of the client connection on which this error occurred.
	#[cfg(any(feature = "http1", feature = "http2"))]
	pub fn connect_info(&self) -> Option<&Connected> {
		self.connect_info.as_ref()
	}

	#[cfg(any(feature = "http1", feature = "http2"))]
	fn with_connect_info(self, connect_info: Connected) -> Self {
		Self {
			connect_info: Some(connect_info),
			..self
		}
	}
	fn is_canceled(&self) -> bool {
		matches!(self.kind, ErrorKind::Canceled)
	}

	fn tx(src: hyper::Error) -> Self {
		e!(SendRequest, src)
	}

	fn closed(src: hyper::Error) -> Self {
		e!(ChannelClosed, src)
	}
}
